package com.tpvlog.dfs.client;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.tpvlog.dfs.client.req.FileInfo;
import com.tpvlog.dfs.client.req.Host;
import com.tpvlog.dfs.client.resp.ResponseCallback;

import java.io.File;

/**
 * 文件系统客户端的实现类
 *
 * @author Ressmix
 */
public class FileSystemImpl implements FileSystem {

    private NameNodeRpcClient rpcClient;

    private NIOClient nioClient;

    public FileSystemImpl() {
        this.rpcClient = new NameNodeRpcClient();
        this.nioClient = new NIOClient();
    }

    public void mkdir(String path) throws Exception {
        rpcClient.mkdir(path);
    }

    public Boolean upload(FileInfo fileInfo, ResponseCallback callback) {
        // 1.RPC接口发送文件元数据
        String filename = fileInfo.getFilename();
        if (!filename.startsWith(File.separator)) {
            filename = File.separator + filename;
        }
        if (!rpcClient.createFile(filename)) {
            return false;
        }

        // 2.RPC接口获取DataNode
        String datanodesJson = rpcClient.allocateDataNodes(fileInfo, "");
        System.out.println(datanodesJson);
        if (datanodesJson == null) {
            return false;
        }

        // 3.遍历DataNode，依次上传文件
        JSONArray datanodes = JSONArray.parseArray(datanodesJson);
        for (int i = 0; i < datanodes.size(); i++) {
            JSONObject datanode = datanodes.getJSONObject(i);
            String hostname = datanode.getString("hostname");
            String ip = datanode.getString("ip");
            int nioPort = datanode.getIntValue("nioPort");
            Host host = new Host(hostname, ip, nioPort);
            try {
                nioClient.sendFile(host, fileInfo, callback);
            } catch (Exception ex) {
                ex.printStackTrace();
                // 出现异常时，上送异常DataNode信息，并重新获取一个正常的DataNode
                String reallocateDataNode = rpcClient.allocateDataNodes(fileInfo, ip + "-" + hostname);
                datanode = JSONObject.parseObject(reallocateDataNode);
                hostname = datanode.getString("hostname");
                ip = datanode.getString("ip");
                nioPort = datanode.getIntValue("nioPort");
                nioClient.sendFile(new Host(hostname, ip, nioPort), fileInfo, callback);
            }

        }
        return true;
    }

    public byte[] download(FileInfo fileInfo) throws Exception {
        // 1.获取待下载文件对应的可用DataNode节点
        String datanode = rpcClient.getDataNodeForFile(fileInfo.getFilename(), "");
        System.out.println("NameNode分配用来下载文件的数据节点：" + datanode);

        // 2.解析DataNode信息
        JSONObject jsonObject = JSONObject.parseObject(datanode);
        String hostname = jsonObject.getString("hostname");
        String ip = jsonObject.getString("ip");
        Integer nioPort = jsonObject.getInteger("nioPort");
        Host host = new Host(hostname, ip, nioPort);

        // 3.基于Java NIO下载文件
        byte[] file = null;
        try {
            file = nioClient.readFile(host, fileInfo);
        } catch (Exception ex) {
            // 出现异常，重新获取一个可用DataNode，上送异常的DataNode信息
            datanode = rpcClient.getDataNodeForFile(fileInfo.getFilename(), ip + "-" + hostname);
            jsonObject = JSONObject.parseObject(datanode);
            hostname = jsonObject.getString("hostname");
            ip = jsonObject.getString("ip");
            nioPort = jsonObject.getInteger("nioPort");
            try {
                file = nioClient.readFile(new Host(hostname, ip, nioPort), fileInfo);
            } catch (Exception e2) {
                throw e2;
            }
        }
        return file;
    }

    public void shutdown() throws Exception {
        rpcClient.shutdown();
    }
}
